Source code for nlp_architect.pipelines.spacy_np_annotator

# ******************************************************************************
# Copyright 2017-2018 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
import pickle
from os import path

import numpy as np
import spacy
from spacy.tokens import Doc
from spacy.tokens import Span

from nlp_architect.models.chunker import SequenceChunker
from nlp_architect.utils.generic import pad_sentences
from nlp_architect.utils.io import validate_existing_filepath
from nlp_architect.utils.text import extract_nps, Stopwords


[docs]class NPAnnotator(object): """ Spacy based NP annotator - uses models.SequenceChunker model for annotation Args: model (SequenceChunker): a chunker model word_vocab (Vocabulary): word-id vocabulary of the model char_vocab (Vocabulary): char id vocabulary of words of the model chunk_vocab (Vocabulary): chunk tag vocabulary of the model batch_size (int, optional): inference batch size """ def __init__(self, model, word_vocab, char_vocab, chunk_vocab, batch_size: int = 32): self.model = model self.bs = batch_size self.word_vocab = word_vocab self.char_vocab = char_vocab self.chunk_vocab = chunk_vocab Doc.set_extension('noun_phrases', default=[], force=True)
[docs] @classmethod def load(cls, model_path: str, parameter_path: str, batch_size: int = 32, use_cudnn: bool = False): """ Load a NPAnnotator annotator Args: model_path (str): path to trained model parameter_path (str): path to model parameters batch_size (int, optional): inference batch_size use_cudnn (bool, optional): use gpu for inference (cudnn cells) Returns: NPAnnotator class with loaded model """ _model_path = path.join(path.dirname(path.realpath(__file__)), model_path) validate_existing_filepath(_model_path) _parameter_path = path.join(path.dirname(path.realpath(__file__)), parameter_path) validate_existing_filepath(_parameter_path) model = SequenceChunker(use_cudnn=use_cudnn) model.load(_model_path) with open(_parameter_path, 'rb') as fp: model_params = pickle.load(fp) word_vocab = model_params['word_vocab'] chunk_vocab = model_params['chunk_vocab'] char_vocab = model_params.get('char_vocab', None) return cls(model, word_vocab, char_vocab, chunk_vocab, batch_size)
def _infer_chunks(self, input_vec, doc_lengths): tagged_sents = self.model.predict(input_vec, batch_size=self.bs).argmax(2) sentence = [] for c, l in zip(tagged_sents, doc_lengths): sentence.append(c[-l:]) doc = np.concatenate(sentence) chunk_tags = [self.chunk_vocab.id_to_word(w) for w in doc] return extract_nps(chunk_tags) def _feature_extractor(self, doc): features = np.asarray([self.word_vocab[w] if self.word_vocab[w] is not None else 1 for w in doc]) if self.char_vocab: sentence_chars = [] for w in doc: word_chars = [] for c in w: _cid = self.char_vocab[c] word_chars.append(_cid if _cid is not None else 1) sentence_chars.append(word_chars) sentence_chars = pad_sentences(sentence_chars, self.model.max_word_len) features = (features, sentence_chars) return features def __call__(self, doc: Doc) -> Doc: """ Annotate the document with noun phrase spans """ spans = [] doc_vecs = [] doc_chars = [] doc_lens = [] if len(doc) < 1: return doc for sentence in doc.sents: features = self._feature_extractor([t.text for t in sentence]) if isinstance(features, tuple): doc_vec = features[0] doc_chars.append(features[1]) else: doc_vec = features doc_vecs.append(doc_vec) doc_lens.append(len(doc_vec)) doc_vectors = pad_sentences(np.asarray(doc_vecs)) inputs = doc_vectors if self.char_vocab: max_len = doc_vectors.shape[1] padded_chars = np.zeros((len(doc_chars), max_len, self.model.max_word_len)) for idx, d in enumerate(doc_chars): d = d[:max_len] padded_chars[idx, -d.shape[0]:] = d inputs = [inputs, padded_chars] np_indexes = self._infer_chunks(inputs, doc_lens) for s, e in np_indexes: np_span = Span(doc, s, e) spans.append(np_span) spans = _NPPostprocessor.process(spans) set_noun_phrases(doc, spans) return doc
[docs]def get_noun_phrases(doc: Doc) -> [Span]: """ Get noun phrase tags from a spacy annotated document. Args: doc (Doc): a spacy type document Returns: a list of noun phrase Span objects """ assert hasattr(doc._, 'noun_phrases'), 'no noun_phrase attributes in document' return doc._.noun_phrases
[docs]def set_noun_phrases(doc: Doc, nps: [Span]) -> None: """ Set noun phrase tags Args: doc (Doc): a spacy type document nps ([Span]): a list of Spans """ assert hasattr(doc._, 'noun_phrases'), 'no noun_phrase attributes in document' doc._.set('noun_phrases', nps)
class _NPPostprocessor: @classmethod def process(cls, noun_phrases: [Span]) -> [Span]: new_phrases = [] for phrase in noun_phrases: p = _NPPostprocessor._phrase_process(phrase) if p is not None and len(p) > 0: new_phrases.append(p) return new_phrases @classmethod def _phrase_process(cls, phrase: Span) -> Span: last_phrase = None while phrase != last_phrase: last_phrase = phrase for func_args in post_processing_rules: pf = func_args[0] args = func_args[1:] if len(args) > 0: phrase = pf(phrase, *args) else: phrase = pf(phrase) if phrase is None: break return phrase def _filter_repeating_nonalnum(phrase, length): """ Check if a given phrase has non repeating alphanumeric chars of given length. Example: 'phrase $$$' with length=3 will return False """ if len(phrase) > 0: alnum_len = length for t in phrase: if not t.is_alpha: alnum_len -= 1 else: alnum_len = length if alnum_len == 0: return None return phrase def _filter_long_phrases(phrase, word_length, phrase_length): if len(phrase) > 0 and max([len(t) for t in phrase]) > word_length \ and len(phrase) > phrase_length: return None return phrase def _remove_non_alphanum_from_start(phrase): if len(phrase) > 1 and not phrase[0].is_alpha: phrase = phrase[1:] return phrase def _remove_non_alphanum_from_end(phrase): if len(phrase) > 1 and not phrase[-1].is_alpha: phrase = phrase[:-1] return phrase def _remove_stop_words(phrase): while len(phrase) > 0 and (phrase[0].is_stop or str(phrase[0]).strip().lower() in Stopwords.get_words()): phrase = phrase[1:] while len(phrase) > 0 and (phrase[-1].is_stop or str(phrase[-1]).strip().lower() in Stopwords.get_words()): phrase = phrase[:-1] return phrase def _remove_char_at_start(phrase): chars = ['@', '-', '=', '.', ':', '+', '?', 'nt', '\"', '\'', '\'S', '\'s', ','] if phrase and len(phrase) > 0: while len(phrase) > 0 and phrase[0].text in chars: phrase = phrase[1:] return phrase def _remove_char_at_end(phrase): chars = [',', '(', ')', ' ', '-'] if phrase: while len(phrase) > 0 and phrase[-1].text in chars: phrase = phrase[:-1] return phrase def _remove_pos_from_start(phrase): tag_list = ['WDT', 'PRP$', ':'] pos_list = ['PUNCT', 'INTJ', 'NUM', 'PART', 'ADV', 'DET', 'PRON', 'VERB'] if phrase: while len(phrase) > 0 and (phrase[0].pos_ in pos_list or phrase[0].tag_ in tag_list): phrase = phrase[1:] return phrase def _remove_pos_from_end(phrase): tag_list = ['WDT', ':'] pos_list = ['DET', 'PUNCT', 'CONJ'] if phrase: while len(phrase) > 0 and (phrase[-1].pos_ in pos_list or phrase[-1].tag_ in tag_list): phrase = phrase[:-1] return phrase def _filter_single_pos(phrase): pos_list = ['VERB', 'ADJ', 'ADV'] if phrase and len(phrase) == 1 and phrase[0].pos_ in pos_list: return None return phrase def _filter_fp_nums(phrase): if len(phrase) > 0: try: # check for float number float(phrase.text.replace(',', '')) return None except ValueError: return phrase return phrase def _filter_single_char(phrase): if phrase and len(phrase) == 1 and len(phrase[0]) == 1: return None return phrase def _filter_empty(phrase): if phrase is None or len(phrase) == 0 or len(phrase.text) == 0 \ or len(str(phrase.text).strip()) == 0: return None return phrase post_processing_rules = [ (_filter_single_char,), (_filter_single_pos,), (_remove_pos_from_start,), (_remove_pos_from_end,), (_remove_stop_words,), (_remove_non_alphanum_from_start,), (_remove_non_alphanum_from_end,), (_filter_repeating_nonalnum, 5), (_filter_long_phrases, 5, 75), (_remove_char_at_start,), (_remove_char_at_end,), (_filter_fp_nums,), (_filter_empty,), ]
[docs]class SpacyNPAnnotator(object): """ Simple Spacy pipe with NP extraction annotations """ def __init__(self, model_path, settings_path, spacy_model='en', batch_size=32, use_cudnn=False): _model_path = path.join(path.dirname(path.realpath(__file__)), model_path) validate_existing_filepath(_model_path) _settings_path = path.join(path.dirname(path.realpath(__file__)), settings_path) validate_existing_filepath(_settings_path) nlp = spacy.load(spacy_model) for p in nlp.pipe_names: if p not in ['tagger']: nlp.remove_pipe(p) nlp.add_pipe(nlp.create_pipe('sentencizer'), first=True) nlp.add_pipe(NPAnnotator.load(_model_path, settings_path, batch_size=batch_size, use_cudnn=use_cudnn), last=True) self.nlp = nlp def __call__(self, text: str) -> [str]: """ Parse a given text and return a list of noun phrases found Args: text (str): a text string Returns: list of noun phrases as strings """ return [np.text for np in get_noun_phrases(self.nlp(text))]